package websocket

import (
	"sync"
	"time"

	"github.com/gogf/gf/encoding/gjson"
	"github.com/gogf/gf/frame/g"
	"github.com/gogf/gf/net/ghttp"
	"github.com/gorilla/websocket"
)

type MessageQueue struct {
	connections []*ghttp.WebSocket // 连接
	text        []byte             // 编码后的消息
}

var (
	queues = make(chan *MessageQueue, 1<<10)
)

func SendMessageToSessions(sessions map[uint64]*Session, msg *MessageRes) {
	cs := make([]*ghttp.WebSocket, 0)
	for _, v := range sessions {
		if conn := v.GetConn(); conn != nil {
			cs = append(cs, v.GetConn())
		}
	}
	SendMessageToConnections(cs, msg)
}

func SendMessageToSession(session *Session, msg *MessageRes) {
	if conn := session.GetConn(); conn != nil {
		SendMessageToConnections([]*ghttp.WebSocket{conn}, msg)
	}
}

func SendMessageToConnections(cs []*ghttp.WebSocket, msg *MessageRes) {
	if text, err := gjson.Encode(msg); err != nil {
		g.Log().Error(`消息格式化错误`, err)
	} else {
		SendTextMessageToConnections(cs, text)
	}
}

func SendTextMessageToConnections(connections []*ghttp.WebSocket, text []byte) {
	queues <- &MessageQueue{connections, text}
}

func (s *server) loopSending() {
	for {
		select {
		case msg := <-queues:
			wg := &sync.WaitGroup{}
			func() {
				expired := time.Now().Add(time.Second * 10)
				for _, v := range msg.connections {
					wg.Add(1)
					go func(conn *ghttp.WebSocket) {
						defer wg.Done()
						defer func() {
							recover()
						}()
						if conn != nil {
							_ = conn.SetWriteDeadline(expired)
							_ = conn.WriteMessage(websocket.TextMessage, msg.text)
						}
					}(v)
				}
			}()
			wg.Wait()
		}
	}
}
